package com.shujia.flink.tf

import org.apache.flink.api.java.functions.KeySelector
import org.apache.flink.streaming.api.scala._

object Demo5KeyBy {
  def main(args: Array[String]): Unit = {
    //创建flink环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val linesDS: DataStream[String] = env.socketTextStream("master", 8888)

    val wordsDS: DataStream[String] = linesDS.flatMap(_.split(","))

    val kvDS: DataStream[(String, Int)] = wordsDS.map((_, 1))

    //java api
    val keyByDS: KeyedStream[(String, Int), String] = kvDS.keyBy(new KeySelector[(String, Int), String] {
      override def getKey(value: (String, Int)): String = {
        value._1
      }
    })

    //key之后进行聚合计算
    val sumDS: DataStream[(String, Int)] = keyByDS.sum(1)

    sumDS.print()

    env.execute()


  }

}
